package com.katesoft.scale4j.rttp.jobs;

import static com.katesoft.scale4j.common.services.IBeanNameReferences.RTTP_BATCH_JOB_REPOSITORY;
import static com.katesoft.scale4j.common.services.IBeanNameReferences.RTTP_SPRING_HAZELCAST_BRIDGE;

import java.util.Collection;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.Set;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.transaction.annotation.Transactional;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ILock;
import com.katesoft.scale4j.common.services.IBeanNameReferences;
import com.katesoft.scale4j.log.LogFactory;
import com.katesoft.scale4j.log.Logger;
import com.katesoft.scale4j.rttp.client.ClusterIsNotActiveException;
import com.katesoft.scale4j.rttp.hibernate.SpringHazelcastBridge;
import com.katesoft.scale4j.rttp.internal.RttpHazelcastBridgeAwareImpl;

/**
 * This cleaner will update all running executions to 'Failed'.
 * <p/>
 * NOTE: this class is designed to be used during application start-up.
 * 
 * @author kate2007
 */
public class CrushedJobsCleaner extends RttpHazelcastBridgeAwareImpl implements InitializingBean,
         ApplicationContextAware {
   private static final String RTTP_CRUSHED_JOBS_CLEANER_LOCK = "rttp.crushed.jobNames.cleaner";
   //
   private JobExplorer jobExplorer;
   private JobRepository jobRepository;
   //
   private final Logger logger = LogFactory.getLogger(getClass());
   private final Collection<String> jobNames = new LinkedHashSet<String>();
   private boolean cleanAll = false;
   private ApplicationContext applicationContext;

   public void setJobs(Collection<Job> jobs) {
      for (Job job : jobs) {
         this.jobNames.add(job.getName());
      }
   }

   public void setCleanAll(boolean cleanAll) {
      this.cleanAll = cleanAll;
   }

   @Autowired
   @Qualifier(value = IBeanNameReferences.RTTP_BATCH_JOB_EXPLORER)
   public void setJobExplorer(JobExplorer jobExplorer) {
      this.jobExplorer = jobExplorer;
   }

   @Autowired
   @Qualifier(value = RTTP_BATCH_JOB_REPOSITORY)
   public void setJobRepository(JobRepository jobRepository) {
      this.jobRepository = jobRepository;
   }

   /**
    * clean all running job executions during application start-up according to configuration.
    * <p/>
    * Method is transactional and guarantees that 2 or more different nodes would not perform db
    * updates for the same jobs.
    * 
    * @throws ClusterIsNotActiveException
    *            if hazelcast member is not active
    */
   @Transactional
   public void clean() throws ClusterIsNotActiveException {
      logger.info("cleaning all running executions for %s jobs", jobNames);
      HazelcastInstance hazelcastInstance = getBridge().getRunningInstance();

      for (String j : jobNames) {
         ILock lock = hazelcastInstance.getLock(RTTP_CRUSHED_JOBS_CLEANER_LOCK + "." + j);
         if (lock.tryLock()) {
            try {
               cleanJob(j);
            } finally {
               lock.unlock();
            }
         }
      }
   }

   /**
    * clean all running job executions for given job name
    * 
    * @param job
    *           name of the job
    */
   protected void cleanJob(String job) {
      Set<JobExecution> runningJobExecutions = jobExplorer.findRunningJobExecutions(job);
      for (JobExecution execution : runningJobExecutions) {
         execution.setStatus(BatchStatus.FAILED);
         execution.setExitStatus(ExitStatus.FAILED);
         execution.setEndTime(new Date());
         jobRepository.update(execution);
         logger.info("jobExecution [%s] updated to [%s]",
                  ReflectionToStringBuilder.reflectionToString(execution),
                  BatchStatus.FAILED.name());
      }
   }

   @Override
   public void afterPropertiesSet() {
      if (getBridge() == null) {
         setRttpHazelcastBridge((SpringHazelcastBridge) applicationContext
                  .getBean(RTTP_SPRING_HAZELCAST_BRIDGE));
      }
      if (jobExplorer == null) {
         jobExplorer = (JobExplorer) applicationContext
                  .getBean(IBeanNameReferences.RTTP_BATCH_JOB_EXPLORER);
      }
      if (jobRepository == null) {
         jobRepository = (JobRepository) applicationContext.getBean(RTTP_BATCH_JOB_REPOSITORY);
      }
      if (cleanAll) {
         jobNames.addAll(jobExplorer.getJobNames());
      }
      clean();
   }

   @Override
   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
      this.applicationContext = applicationContext;
   }
}
